package server

import (
	"context"
	"fmt"
	"github.com/IBM/sarama"
	"github.com/gogf/gf/util/gconv"
	"kratos_kafka/internal/conf"
	"kratos_kafka/internal/service"
	"kratos_kafka/internal/utils"
	"os"
	"os/signal"
	"time"
)

type DelayConsumer struct {
	// 将GreeterService放进去方便业务处理～
	greeterService *service.GreeterService
	delay          time.Duration
	sendTopic      string
}

func NewDelayConsumer(bootConf *conf.Bootstrap, greeterService *service.GreeterService) *DelayConsumer {
	// Notice 注意 delay配置 这样初始化(带秒的)
	delaySecsInt64 := bootConf.DelayQueue.SaramaKafka.Msg1DelaySecs
	delayDuration := time.Duration(delaySecsInt64) * time.Second
	return &DelayConsumer{
		greeterService: greeterService,
		delay:          delayDuration,
		sendTopic:      bootConf.DelayQueue.SaramaKafka.RealTopic,
	}
}

// Notice 使用方法详细的使用参考 https://juejin.cn/post/6999263126713696293#heading-4
func (c *DelayConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// 轮询消息...
	for message := range claim.Messages() {
		// TODO 需要业务处理的话先json解析一下...

		fmt.Printf("delay_topic1获取到的原始message:>>> delay: %v, topic: %v, partition: %v, offset: %v, key: %v, val: %v, timeStamp: %v, blockTimestamp: %v \n",
			c.delay, message.Topic, message.Partition, message.Offset, gconv.String(message.Key), gconv.String(message.Value), gconv.String(message.Timestamp), gconv.String(message.BlockTimestamp))

		// 如果消息已经超时 就把消息放到真实的队列
		if utils.GetCurrentTime().Sub(message.Timestamp) >= c.delay {
			fmt.Printf("delay_topic1处理有效的数据:>>> topic: %v, partition: %v, offset: %v, key: %v, val: %v, timeStamp: %v, blockTimestamp: %v \n",
				message.Topic, message.Partition, message.Offset, gconv.String(message.Key), gconv.String(message.Value), gconv.String(message.Timestamp), gconv.String(message.BlockTimestamp))

			// 如果往 real_topic 中发消息也有"根据xxx字段发送到指定分区"的需求可以把xxx字段设置到Key上～
			// SendMessage 调用 GreeterService/biz 中的方法
			partition, offset, errSend := c.greeterService.Uc.SaramaSendMessageBiz(c.sendTopic, message.Key, message.Value)

			// MarkMessage！
			if errSend == nil {
				// Notice MarkMessage！
				session.MarkMessage(message, "")
				fmt.Printf("delay1往 %v 发送了有效的数据: partition: %v, offset: %v, key: %v, value: %v, timeStamp: %v  \n", c.sendTopic, partition, offset, string(message.Key), string(message.Value), message.Timestamp)
				continue
			} else {
				fmt.Printf("delay服务往 %v 发送消息失败! err: %v \n, ", c.sendTopic, errSend)
				return nil
			}
		}

		// Notice 为了实现"一直轮询没处理消息"的效果，这里一定要 return！否则一直会停在这里外面的死循环不生效～
		return nil
	}
	return nil
}

func (c *DelayConsumer) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (c *DelayConsumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// 延迟服务的 sarama-kafka consumers: 根据不同延迟时间消费不同topic的数据～
func NewSaramaKafkaDelayConsumerServer(bootConf *conf.Bootstrap, greeterService *service.GreeterService) *sarama.ConsumerGroup {

	ctx, cancel := context.WithCancel(context.Background())

	// Notice 如何使用TLS连接kafka，参考 tests/sarama_kafka_tls 中的代码～

	// NewConfig的时候初始化了各种配置～
	delayConsumerConfig := sarama.NewConfig()
	// 未找到组消费位移的时候从哪边开始消费 用默认的
	delayConsumerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
	//delayConsumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	// Notice new consumer group
	delayConsumerGroup, errDelayConsumerGroup := sarama.NewConsumerGroup(
		// addr
		bootConf.DelayQueue.SaramaKafka.Server.Addr,
		// group
		bootConf.DelayQueue.SaramaKafka.DelayGroup1,
		delayConsumerConfig,
	)
	if errDelayConsumerGroup != nil {
		panic(errDelayConsumerGroup)
	}

	// 监听停止信号，防止 goroutine泄漏!
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt, os.Kill)

	// 自定义的 Consumer 重写 ConsumeClaim方法 业务自定制需求
	currDelayConsumer := NewDelayConsumer(bootConf, greeterService)
	go func() {
		// Notice 死循环...
		for {
			//printMsgTest("delay_sadfasdfasdf")
			select {
			case err := <-delayConsumerGroup.Errors():
				if err != nil {
					// fmt.Println("delayConsumerGroup.Errors(): ", err)
					printMsgTest("delayConsumerGroup.Errors(): " + err.Error())
				}
			// TODO ... 最好写一个 clean函数 将取消信号加进去～～
			case <-signals:
				delayConsumerGroup.Close()
				cancel()
				return
			default:
				errConsume := delayConsumerGroup.Consume(
					ctx,
					// topic
					[]string{bootConf.DelayQueue.SaramaKafka.DelayTopic1},
					currDelayConsumer,
				)
				if errConsume != nil {
					fmt.Println("delayConsumerGroup消费消息发生错误! err: ", errConsume)
				}
			}
		}

	}()

	return &delayConsumerGroup

}
